package com.gxlevi.main

import com.gxlevi.bean.{WebLogBean, WebLogBeanCase}
import com.gxlevi.service.PageViewService
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

import scala.collection.mutable

object OpitmizeApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val spark = SparkSession.builder()
      .master("local[*]")
      .config(conf).appName(OpitmizeApp.getClass.getName).getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val textRdd: RDD[String] = sc.textFile("/spark_etl/data/input3")
    val webLogBeanRdd: RDD[WebLogBean] = textRdd.map(WebLogBean(_))
    //webLogBeanRdd.take(6).foreach(println(_))
    val filterWebLogBeanRdd: RDD[WebLogBean] = webLogBeanRdd.filter {
      x => {
        x != null && x.valid
      }
    }
    //filterWebLogBeanRdd.take(10).foreach(println(_))
    initlizePages
    val pagesBroadCast: Broadcast[mutable.HashSet[String]] = sc.broadcast(pages)
    val filterStaticWebLogRdd: RDD[WebLogBean] = filterWebLogBeanRdd.filter {
      bean => {
        val request: String = bean.request
        val res: Boolean = pagesBroadCast.value.contains(request)
        if (res) {
          false
        } else {
          true
        }
      }
    }

    import spark.implicits._
    val webLogBeanCaseDataset: Dataset[WebLogBeanCase] = filterStaticWebLogRdd.map(bean => WebLogBeanCase(
      bean.valid,
      bean.remote_addr,
      bean.remote_user,
      bean.time_local,
      bean.request,
      bean.status,
      bean.body_bytes_sent,
      bean.http_referer,
      bean.http_user_agent,
      bean.guid
    )).toDS()

    //将清洗后的数据存入ods层
    //webLogBeanCaseDataset.write.mode(SaveMode.Overwrite)
    //  .parquet("/user/hive/warehouse/itcast_ods.db/weblog_origin/dt=20200626")

    PageViewService.savePageViewToHdfs(filterStaticWebLogRdd)
  }


  //过滤静态资源
  val pages = new mutable.HashSet[String]()

  def initlizePages(): Unit = {
    pages.add("/about")
    pages.add("/black-ip-list/")
    pages.add("/cassandra-clustor/")
    pages.add("/finance-rhive-repurchase/")
    pages.add("/hadoop-family-roadmap/")
    pages.add("/hadoop-hive-intro/")
    pages.add("/hadoop-zookeeper-intro/")
    pages.add("/hadoop-mahout-roadmap/")
  }
}
